package com.caiya.kafka.spring.config;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.regex.Pattern;

import com.caiya.kafka.KafkaTemplate;
import com.caiya.kafka.listener.BatchMessageListener;
import com.caiya.kafka.listener.MessageListener;
import com.caiya.kafka.listener.MessageListenerContainer;
import com.caiya.kafka.listener.SeekToCurrentErrorHandler;
import com.caiya.kafka.spring.listener.adaptor.*;
import com.caiya.kafka.support.TopicPartitionInitialOffset;
import com.caiya.kafka.support.converter.MessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.expression.BeanFactoryResolver;
import org.springframework.expression.BeanResolver;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/**
 * Base model for a Kafka listener endpoint.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Stephane Nicoll
 * @author Gary Russell
 *
 * @see MethodKafkaListenerEndpoint
 */
public abstract class AbstractKafkaListenerEndpoint<K, V>
        implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean {

    private String id;

    private String groupId;

    private final Collection<String> topics = new ArrayList<>();

    private Pattern topicPattern;

    private final Collection<TopicPartitionInitialOffset> topicPartitions = new ArrayList<>();

    private BeanFactory beanFactory;

    private BeanExpressionResolver resolver;

    private BeanExpressionContext expressionContext;

    private BeanResolver beanResolver;

    private String group;

    private RecordFilterStrategy<K, V> recordFilterStrategy;

    private boolean ackDiscarded;

    private RetryTemplate retryTemplate;

    private RecoveryCallback<? extends Object> recoveryCallback;

    private boolean statefulRetry;

    private boolean batchListener;

    private KafkaTemplate<K, V> replyTemplate;

    private String clientIdPrefix;

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
        if (beanFactory instanceof ConfigurableListableBeanFactory) {
            this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
            this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, null);
        }
        this.beanResolver = new BeanFactoryResolver(beanFactory);
    }

    protected BeanFactory getBeanFactory() {
        return this.beanFactory;
    }

    protected BeanExpressionResolver getResolver() {
        return this.resolver;
    }

    protected BeanExpressionContext getBeanExpressionContext() {
        return this.expressionContext;
    }

    protected BeanResolver getBeanResolver() {
        return this.beanResolver;
    }

    public void setId(String id) {
        this.id = id;
    }

    @Override
    public String getId() {
        return this.id;
    }

    /**
     * Set the group id to override the {@code group.id} property in the
     * connectionFactory.
     * @param groupId the group id.
     * @since 1.3
     */
    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    @Override
    public String getGroupId() {
        return this.groupId;
    }

    /**
     * Set the topics to use. Either these or 'topicPattern' or 'topicPartitions'
     * should be provided, but not a mixture.
     * @param topics to set.
     * @see #setTopicPartitions(TopicPartitionInitialOffset...)
     * @see #setTopicPattern(Pattern)
     */
    public void setTopics(String... topics) {
        Assert.notNull(topics, "'topics' must not be null");
        this.topics.clear();
        this.topics.addAll(Arrays.asList(topics));
    }

    /**
     * Return the topics for this endpoint.
     * @return the topics for this endpoint.
     */
    @Override
    public Collection<String> getTopics() {
        return Collections.unmodifiableCollection(this.topics);
    }

    /**
     * Set the topicPartitions to use.
     * Either this or 'topic' or 'topicPattern'
     * should be provided, but not a mixture.
     * @param topicPartitions to set.
     * @see #setTopics(String...)
     * @see #setTopicPattern(Pattern)
     */
    public void setTopicPartitions(TopicPartitionInitialOffset... topicPartitions) {
        Assert.notNull(topicPartitions, "'topics' must not be null");
        this.topicPartitions.clear();
        this.topicPartitions.addAll(Arrays.asList(topicPartitions));
    }

    /**
     * Return the topicPartitions for this endpoint.
     * @return the topicPartitions for this endpoint.
     */
    @Override
    public Collection<TopicPartitionInitialOffset> getTopicPartitions() {
        return Collections.unmodifiableCollection(this.topicPartitions);
    }

    /**
     * Set the topic pattern to use. Cannot be used with
     * topics or topicPartitions.
     * @param topicPattern the pattern
     * @see #setTopicPartitions(TopicPartitionInitialOffset...)
     * @see #setTopics(String...)
     */
    public void setTopicPattern(Pattern topicPattern) {
        this.topicPattern = topicPattern;
    }

    /**
     * Return the topicPattern for this endpoint.
     * @return the topicPattern for this endpoint.
     */
    @Override
    public Pattern getTopicPattern() {
        return this.topicPattern;
    }

    @Override
    public String getGroup() {
        return this.group;
    }

    /**
     * Set the group for the corresponding listener container.
     * @param group the group.
     */
    public void setGroup(String group) {
        this.group = group;
    }

    /**
     * Return true if this endpoint creates a batch listener.
     * @return true for a batch listener.
     * @since 1.1
     */
    public boolean isBatchListener() {
        return this.batchListener;
    }

    /**
     * Set to true if this endpoint should create a batch listener.
     * @param batchListener true for a batch listener.
     * @since 1.1
     */
    public void setBatchListener(boolean batchListener) {
        this.batchListener = batchListener;
    }

    /**
     * Set the {@link KafkaTemplate} to use to send replies.
     * @param replyTemplate the template.
     * @since 2.0
     */
    public void setReplyTemplate(KafkaTemplate<K, V> replyTemplate) {
        this.replyTemplate = replyTemplate;
    }

    protected KafkaTemplate<K, V> getReplyTemplate() {
        return this.replyTemplate;
    }

    protected RecordFilterStrategy<K, V> getRecordFilterStrategy() {
        return this.recordFilterStrategy;
    }

    /**
     * Set a {@link RecordFilterStrategy} implementation.
     * @param recordFilterStrategy the strategy implementation.
     */
    public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
        this.recordFilterStrategy = recordFilterStrategy;
    }

    protected boolean isAckDiscarded() {
        return this.ackDiscarded;
    }

    /**
     * Set to true if the {@link #setRecordFilterStrategy(RecordFilterStrategy)
     * recordFilterStrategy} is in use.
     * @param ackDiscarded the ackDiscarded.
     */
    public void setAckDiscarded(boolean ackDiscarded) {
        this.ackDiscarded = ackDiscarded;
    }

    protected RetryTemplate getRetryTemplate() {
        return this.retryTemplate;
    }

    /**
     * Set a retryTemplate.
     * @param retryTemplate the template.
     */
    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    protected RecoveryCallback<?> getRecoveryCallback() {
        return this.recoveryCallback;
    }

    /**
     * Set a callback to be used with the {@link #setRetryTemplate(RetryTemplate)
     * retryTemplate}.
     * @param recoveryCallback the callback.
     */
    public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    protected boolean isStatefulRetry() {
        return this.statefulRetry;
    }

    /**
     * When using a {@link RetryTemplate}, set to true to enable stateful retry. Use in
     * conjunction with a
     * {@link SeekToCurrentErrorHandler} when retry can
     * take excessive time; each failure goes back to the broker, to keep the Consumer
     * alive.
     * @param statefulRetry true to enable stateful retry.
     * @since 2.1.3
     */
    public void setStatefulRetry(boolean statefulRetry) {
        this.statefulRetry = statefulRetry;
    }

    @Override
    public String getClientIdPrefix() {
        return this.clientIdPrefix;
    }

    /**
     * Set the client id prefix; overrides the client id in the consumer configuration
     * properties.
     * @param clientIdPrefix the prefix.
     * @since 2.1.1
     */
    public void setClientIdPrefix(String clientIdPrefix) {
        this.clientIdPrefix = clientIdPrefix;
    }

    @Override
    public void afterPropertiesSet() {
        boolean topicsEmpty = getTopics().isEmpty();
        boolean topicPartitionsEmpty = getTopicPartitions().isEmpty();
        if (!topicsEmpty && !topicPartitionsEmpty) {
            throw new IllegalStateException("Topics or topicPartitions must be provided but not both for " + this);
        }
        if (this.topicPattern != null && (!topicsEmpty || !topicPartitionsEmpty)) {
            throw new IllegalStateException("Only one of topics, topicPartitions or topicPattern must are allowed for "
                    + this);
        }
        if (this.topicPattern == null && topicsEmpty && topicPartitionsEmpty) {
            throw new IllegalStateException("At least one of topics, topicPartitions or topicPattern must be provided "
                    + "for " + this);
        }
    }

    @Override
    public void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter) {
        setupMessageListener(listenerContainer, messageConverter);
    }

    /**
     * Create a {@link MessageListener} that is able to serve this endpoint for the
     * specified container.
     * @param container the {@link MessageListenerContainer} to create a {@link MessageListener}.
     * @param messageConverter the message converter - may be null.
     * @return a {@link MessageListener} instance.
     */
    protected abstract MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
                                                                                   MessageConverter messageConverter);

    @SuppressWarnings("unchecked")
    private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {
        Object messageListener = createMessageListener(container, messageConverter);
        Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
        if (this.retryTemplate != null) {
            messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
                    this.retryTemplate, this.recoveryCallback, this.statefulRetry);
        }
        if (this.recordFilterStrategy != null) {
            if (this.batchListener) {
                messageListener = new FilteringBatchMessageListenerAdapter<>(
                        (BatchMessageListener<K, V>) messageListener, this.recordFilterStrategy, this.ackDiscarded);
            }
            else {
                messageListener = new FilteringMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
                        this.recordFilterStrategy, this.ackDiscarded);
            }
        }
        container.setupMessageListener(messageListener);
    }

    /**
     * Return a description for this endpoint.
     * @return a description for this endpoint.
     * <p>Available to subclasses, for inclusion in their {@code toString()} result.
     */
    protected StringBuilder getEndpointDescription() {
        StringBuilder result = new StringBuilder();
        return result.append(getClass().getSimpleName()).append("[").append(this.id).
                append("] topics=").append(this.topics).
                append("' | topicPartitions='").append(this.topicPartitions).
                append("' | topicPattern='").append(this.topicPattern).append("'");
    }

    @Override
    public String toString() {
        return getEndpointDescription().toString();
    }

}
